Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SQL adapter #779

Merged
merged 54 commits into from
Feb 14, 2025
Merged

add SQL adapter #779

merged 54 commits into from
Feb 14, 2025

Conversation

skarakuzu
Copy link
Contributor

@skarakuzu skarakuzu commented Aug 22, 2024

preliminary start of sql adapter. to be continued ...

Checklist

  • Add a Changelog entry
  • Add the ticket number which this PR closes to the comment section

@danielballan
Copy link
Member

Lifecycle:

  1. Client declares that it wants to create a new tabular dataset, via a request POST /api/v1/metadata/my_table.
  2. In "catalog" SQL database, the server adds a row to the nodes table with any metadata about this table. This is how the new table is connected to any overall dataset, like Bluesky scan and its Scan ID.
  3. Also in the "catalog" SQL database, the server adds a row each to the data_sources table and the assets table. Together, they describe how to locate where the new data will be saved. The Asset part is very locked down. It has room for the URI of the tabular SQL database: postgresql://... and some boilerplate. The DataSource has a freeform area called parameters, which can fit any JSON. We can use this to put in dataset-specific details, like the name of the SQL table (table_name)---derived from the Arrow schema in this case---and a means of selecting the rows of interest for this new dataset (dataset_id).
  4. When data is written or read, a SQLAdapter object is instantiated inside the server. It is passed information extracted from this DataSource and Asset. So, it can know the table_name and the dataset_id.

@danielballan
Copy link
Member

danielballan commented Dec 13, 2024

Test script:

import pandas
from tiled.client import from_uri
from tiled.structures.core import StructureFamily
from tiled.structures.data_source import Asset, DataSource, Management
from tiled.structures.table import TableStructure

client = from_uri("http://localhost:8000", api_key="secret")

df = pandas.DataFrame({"a": [1, 2, 3], "b": [1., 2., 3.]})
structure = TableStructure.from_pandas(df)

x = client.new(
    structure_family=StructureFamily.table,
    data_sources=[
        DataSource(
            management=Management.writable,
            mimetype="application/x-tiled-sql-table",
            structure_family=StructureFamily.table,
	        structure=structure,
            assets=[],
        ),
    ],
    metadata={},
    specs=[],
    key="x",
)
~~x.write(df)~~
x.append_partition(df, 0)

# This does not work yet
# x.read()  # calls /table/partition/x?partition=0 adapter.read_partition()

@skarakuzu skarakuzu force-pushed the add_sql_adapter branch 2 times, most recently from 7913020 to 75b2ddc Compare January 15, 2025 17:56
@danielballan
Copy link
Member

danielballan commented Jan 16, 2025

For this PR

  • Add dataset_id column and filter by it.
  • Create table eagerly.
  • In Adapter, remove write. Write would mean "overwrite" or "replace" and we are not sure we want to expose this. (We can add it later if we want it.)
  • In client, replace write_appendable_dataframe with create_appendable_dataframe. This will run the self.new(...) call, which runs init_storage on the server side, but it will not take any data. Data will be appended in later calls.
  • In Adapter, I removed append and used append_partition. (For now it's stuck at partition=0 but this constraint will be temporary.) Tests need to be updated.
  • Execute CREATE INDEX IF NOT EXISTS .... on dataset_id column.
  • Pandas indexes should round-trip. (Dan)
  • Protect against SQL injection. In init_storage, table_name should match some restrictive regex pattern. Maybe lowercase letters, numbers, and underscores?

Intended usage now looks like...

The following prompts the server to:

  1. Generate a table_name from schema hash. (The table might or might not already exist, containing rows from other dataset_ids.)
  2. Generate a new unique dataset_id for this dataset.
  3. Store the table_name, dataset_id, and any metadata passed here in the catalog database.
# This uploads no data.
x = client.create_appendable_table(schema, key="x")

The following prompts the server to:

  1. Create the table {table_name} if it not yet exist.
  2. Ingest the rows into that table, with an additional dataset_id column.
# Now data can be added, potentially in parallel.
x.append_partition(df, 0)

In a separate process, this would also work. We can access an existing table and keep appending.

x = client["x"]
x.append_partition(df, 0)

In following up PRs...

  • Support PG database with credentials.
  • Connection pooling
  • Supporting more than one partition. SQL will scale find to a large table, but current Tiled does not let the client request less than a full partition. We either need to change that and let users request row ranges (seems complicated, especially with parquet...so I think might be something to wait to do...) or mark up the data in the SQL table as belonging to reasonably-sized partitions. Similar to how arrays are chunked by the client, table rows should be partitioned.

Maybe in the future partitions are added like this? Not sure whether PostgreSQL native "table partitioning" fits our use case.

# table_blahblahblah
dataset_id partition_id ...
12345        1
12345        1
12345        2
12345        3
12345        3
12345        3
24323        
def read_partition(self, partition):
    query = f"SELECT * FROM {self.table_name} WHERE dataset_id={self.dataset_id} AND partition={partition}"
    ...

@danielballan danielballan force-pushed the add_sql_adapter branch 2 times, most recently from 60c6f2c to 0c0473e Compare February 12, 2025 15:30
Seher Karakuzu and others added 11 commits February 12, 2025 10:33
preliminary start of sql adapter. to be continued ...

hashed table names. to be continued...

modified hashing and added a test for sqlite database. to be continued

try TILED_TEST_POSTGRESQL_URI usage

fix postgreql uri

Automatically set SQL driver if unset.

Do not require env var to be set.

Consistently use database URI with schema.

Refactor init_storage interface for SQL.

More adapters updated

More adapters updated

Parse uri earlier.

Use dataclass version of DataSource.

Begin to update SQLAdapter.

Fix import

Typesafe accessor for Storage

few changes

Basic write and append works

Do not preserve index.

changes in test_sql.py

latest changes

tried to fix the tests

removed prints

Remove vestigial comment.

Extract str path from sqlite URI

Use unique temp dir and clean it up.

some more fixing and addition of partitions

fixing docstrings

CLI works with SQL writing

Tests pass again

Add convenience method write_appendable_dataframe.

Fix typo

Fix path handling for Windows

The dataset_id concept is mostly implemented

Fix conditional

Support appendable tables with --temp catalog

Revert order swap (for now)
@danielballan danielballan merged commit 819d194 into bluesky:main Feb 14, 2025
8 checks passed
for key in self._structure.columns
)

def append_partition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to use this adapter to write tabular data. Is there a reason it only has the append_partition method, not write_partition or write like the rest of writable adapters (at least CSV and Parquet adapters seem to have them). I think it would be nice to unify and standardize the interfaces of all writable adapters, similarly to how we have done this with .from_catalog and .from_uris methods to make it less confusing for the user. @skarakuzu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was implemented initially but removed later. If I remember correctly, it is because write or write_partition means deleting the existing table and starting a new one but we do not want to delete the tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thank you for the explaining this, Seher

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants